/**
 * Copyright (C) 2013 Flowroute LLC (flowroute.com)
 *
 * This file is part of Kamailio, a free SIP server.
 *
 * This file is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version
 *
 *
 * This file is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <jansson.h>
#include <event.h>
#include <event2/dns.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <signal.h>

#include "../../core/sr_module.h"
#include "../../core/route.h"
#include "../../core/mem/mem.h"
#include "../../core/action.h"
#include "../../core/route_struct.h"
#include "../../core/lvalue.h"
#include "../../core/cfg/cfg_struct.h"
#include "../../core/rand/fastrand.h"
#include "../tm/tm_load.h"
#include "../jansson/jansson_utils.h"

#include "janssonrpc.h"
#include "janssonrpc_request.h"
#include "janssonrpc_server.h"
#include "janssonrpc_io.h"
#include "janssonrpc_connect.h"
#include "netstring.h"

extern struct tm_binds tmb;

/* event bases */
struct event_base *global_ev_base = NULL;
struct evdns_base *global_evdns_base = NULL;

void cmd_pipe_cb(int fd, short event, void *arg);
void io_shutdown(int sig);

int jsonrpc_io_child_process(int cmd_pipe)
{
	struct event *pipe_ev = NULL;

	global_ev_base = event_base_new();
	global_evdns_base = evdns_base_new(global_ev_base, 1);

	set_non_blocking(cmd_pipe);
	pipe_ev = event_new(
			global_ev_base, cmd_pipe, EV_READ | EV_PERSIST, cmd_pipe_cb, NULL);

	if(!pipe_ev) {
		ERR("Failed to create pipe event\n");
		return -1;
	}

	if(event_add(pipe_ev, NULL) < 0) {
		ERR("Failed to start pipe event\n");
		return -1;
	}

	connect_servers(global_server_group);

#if 0
	/* attach shutdown signal handler */
	/* The shutdown handler are intended to clean up the remaining memory
	 * in the IO process. However, catching the signals causes unpreditable
	 * behavior in the Kamailio shutdown process, so this should be disabled
	 * except when doing memory debugging. */
	struct sigaction sa;
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = 0;
	sa.sa_handler = io_shutdown;
	if(sigaction(SIGTERM, &sa, NULL) == -1) {
		ERR("Failed to attach IO shutdown handler to SIGTERM\n");
	} else if(sigaction(SIGINT, NULL, &sa) == -1) {
		ERR("Failed to attach IO shutdown handler to SIGINT\n");
	}
#endif

	if(event_base_dispatch(global_ev_base) < 0) {
		ERR("IO couldn't start event loop\n");
		return -1;
	}
	return 0;
}

void io_shutdown(int sig)
{
	INFO("Shutting down JSONRPC IO process...\n");
	lock_get(jsonrpc_server_group_lock); /* blocking */

	INIT_SERVER_LOOP
	FOREACH_SERVER_IN(global_server_group)
	close_server(server);
	ENDFOR

	evdns_base_free(global_evdns_base, 0);
	event_base_loopexit(global_ev_base, NULL);
	event_base_free(global_ev_base);

	lock_release(jsonrpc_server_group_lock);
}

int send_to_script(pv_value_t *val, jsonrpc_req_cmd_t *req_cmd)
{
	if(!(req_cmd))
		return -1;

	if(req_cmd->route.len <= 0)
		return -1;

	jsonrpc_result_pv.setf(
			req_cmd->msg, &jsonrpc_result_pv.pvp, (int)EQ_T, val);

	int n = route_lookup(&main_rt, req_cmd->route.s);
	if(n < 0) {
		ERR("no such route: %s\n", req_cmd->route.s);
		return -1;
	}

	struct action *route = main_rt.rlist[n];

	if(tmb.t_continue(req_cmd->t_hash, req_cmd->t_label, route) < 0) {
		ERR("Failed to resume transaction\n");
		return -1;
	}
	return 0;
}

json_t *internal_error(int code, json_t *data)
{
	json_t *ret = json_object();
	json_t *inner = json_object();
	char *message;

	switch(code) {
		case JRPC_ERR_REQ_BUILD:
			message = "Failed to build request";
			break;
		case JRPC_ERR_SEND:
			message = "Failed to send";
			break;
		case JRPC_ERR_BAD_RESP:
			message = "Bad response result";
			json_object_set(ret, "data", data);
			break;
		case JRPC_ERR_RETRY:
			message = "Retry failed";
			break;
		case JRPC_ERR_SERVER_DISCONNECT:
			message = "Server disconnected";
			break;
		case JRPC_ERR_TIMEOUT:
			message = "Message timeout";
			break;
		case JRPC_ERR_PARSING:
			message = "JSON parse error";
			break;
		case JRPC_ERR_BUG:
			message = "There is a bug";
			break;
		default:
			ERR("Unrecognized error code: %d\n", code);
			message = "Unknown error";
			break;
	}

	json_t *message_js = json_string(message);
	json_object_set(inner, "message", message_js);
	if(message_js)
		json_decref(message_js);

	json_t *code_js = json_integer(code);
	json_object_set(inner, "code", code_js);
	if(code_js)
		json_decref(code_js);

	if(data) {
		json_object_set(inner, "data", data);
	}

	json_object_set(ret, "internal_error", inner);
	if(inner)
		json_decref(inner);
	return ret;
}

void fail_request(int code, jsonrpc_request_t *req, char *err_str)
{
	char *req_s;
	char *freeme = NULL;
	pv_value_t val;
	json_t *error;

	if(!req) {
	null_req:
		WARN("%s: (null)\n", err_str);
		goto end;
	}

	if(!(req->cmd) || (req->cmd->route.len <= 0)) {
	no_route:
		req_s = json_dumps(req->payload, JSON_COMPACT);
		if(req_s) {
			WARN("%s: \n%s\n", err_str, req_s);
			free(req_s);
			goto end;
		}
		goto null_req;
	}

	error = internal_error(code, req->payload);
	jsontoval(&val, &freeme, error);
	if(error)
		json_decref(error);
	if(send_to_script(&val, req->cmd) < 0) {
		goto no_route;
	}

end:
	if(freeme)
		free(freeme);
	if(req) {
		if(req->cmd)
			free_req_cmd(req->cmd);
		free_request(req);
	}
}

void timeout_cb(int fd, short event, void *arg)
{
	jsonrpc_request_t *req = (jsonrpc_request_t *)arg;
	if(!req)
		return;

	if(!(req->server)) {
		ERR("No server defined for request\n");
		return;
	}

	if(schedule_retry(req) < 0) {
		fail_request(JRPC_ERR_TIMEOUT, req, "Request timeout");
	}
}


int server_tried(jsonrpc_server_t *server, server_list_t *tried)
{
	if(!server)
		return 0;

	int t = 0;
	for(; tried != NULL; tried = tried->next) {
		if(tried->server && server == tried->server) {
			t = 1;
		}
	}
	return t;
}

/* loadbalance_by_weight() uses an algorithm to randomly pick a server out of
 * a list based on its relative weight.
 *
 * It is loosely inspired by this:
 * http://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
 *
 * The insert_server_group() function provides the ability to get the combined
 * weight of all the servers off the head of the list, making it possible to
 * compute in O(n) in the worst case and O(1) in the best.
 *
 * A random number out of the total weight is chosen. Each node is inspected and
 * its weight added to a recurring sum. Once the sum is larger than the random
 * number the last server that was seen is chosen.
 *
 * A weight of 0 will almost never be chosen, unless if maybe all the other
 * servers are offline.
 *
 * The exception is when all the servers in a group have a weight of 0. In
 * this case, the load should be distributed evenly across each of them. This
 * requires finding the size of the list beforehand.
 * */
void loadbalance_by_weight(
		jsonrpc_server_t **s, jsonrpc_server_group_t *grp, server_list_t *tried)
{
	*s = NULL;

	if(grp == NULL) {
		ERR("Trying to pick from an empty group\n");
		return;
	}

	if(grp->type != WEIGHT_GROUP) {
		ERR("Trying to pick from a non weight group\n");
		return;
	}

	jsonrpc_server_group_t *head = grp;
	jsonrpc_server_group_t *cur = grp;

	unsigned int pick = 0;
	if(head->weight == 0) {
		unsigned int size = 0;
		size = server_group_size(cur);
		if(size == 0)
			return;

		pick = fastrand_max(size - 1);

		int i;
		for(i = 0; (i <= pick || *s == NULL) && cur != NULL;
				i++, cur = cur->next) {
			if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
				if(!server_tried(cur->server, tried)
						&& (cur->server->hwm <= 0
								|| cur->server->req_count < cur->server->hwm)) {
					*s = cur->server;
				}
			}
		}
	} else {
		pick = fastrand_max(head->weight - 1);

		unsigned int sum = 0;
		while(1) {
			if(cur == NULL)
				break;
			if(cur->server->status == JSONRPC_SERVER_CONNECTED) {
				if(!server_tried(cur->server, tried)
						&& (cur->server->hwm <= 0
								|| cur->server->req_count < cur->server->hwm)) {
					*s = cur->server;
				}
			}
			sum += cur->server->weight;
			if(sum > pick && *s != NULL)
				break;
			cur = cur->next;
		}
	}
}

int jsonrpc_send(str conn, jsonrpc_request_t *req, bool notify_only)
{
	char *json = NULL;
	bool sent = false;
	char *ns = NULL;
	size_t bytes;

	json = (char *)json_dumps(req->payload, JSON_COMPACT);
	if(json == NULL) {
		LM_ERR("failed to do json dump for request payload\n");
		return -1;
	}
	bytes = netstring_encode_new(&ns, json, (size_t)strlen(json));

	jsonrpc_server_group_t *c_grp = NULL;
	if(global_server_group != NULL)
		c_grp = *global_server_group;
	jsonrpc_server_group_t *p_grp = NULL;
	jsonrpc_server_group_t *w_grp = NULL;
	jsonrpc_server_t *s = NULL;
	server_list_t *tried_servers = NULL;
	DEBUG("SENDING DATA\n");
	for(; c_grp != NULL; c_grp = c_grp->next) {

		if(strncmp(conn.s, c_grp->conn.s, c_grp->conn.len) != 0)
			continue;

		for(p_grp = c_grp->sub_group; p_grp != NULL; p_grp = p_grp->next) {
			w_grp = p_grp->sub_group;
			while(!sent) {
				loadbalance_by_weight(&s, w_grp, tried_servers);
				if(s == NULL || s->status != JSONRPC_SERVER_CONNECTED) {
					break;
				}

				if(bufferevent_write(s->bev, ns, bytes) == 0) {
					sent = true;
					if(!notify_only) {
						s->req_count++;
						if(s->hwm > 0 && s->req_count >= s->hwm) {
							WARN("%.*s:%d in connection group %.*s has "
								 "exceeded its high water mark (%d)\n",
									STR(s->addr), s->port, STR(s->conn),
									s->hwm);
						}
					}
					req->server = s;
					break;
				} else {
					addto_server_list(s, &tried_servers);
				}
			}

			if(sent) {
				break;
			}

			WARN("Failed to send to priority group, %d\n", p_grp->priority);
			if(p_grp->next != NULL) {
				INFO("Proceeding to next priority group, %d\n",
						p_grp->next->priority);
			}
		}

		if(sent) {
			break;
		}
	}

	if(!sent) {
		WARN("Failed to send to connection group, \"%.*s\"\n", STR(conn));
		if(schedule_retry(req) < 0) {
			fail_request(JRPC_ERR_RETRY, req, "Failed to schedule retry");
		}
	}

	free_server_list(tried_servers);
	if(ns)
		pkg_free(ns);
	free(json);

	if(sent) {
		if(notify_only
				== true) { // free the request if using janssonrpc_notification function
			free_req_cmd(req->cmd);
			req->cmd = NULL;
			free_request(req);
		} else {
			struct timeval tv;
			jsr_ms_to_tv(req->timeout, tv);

			req->timeout_ev =
					evtimer_new(global_ev_base, timeout_cb, (void *)req);
			if(event_add(req->timeout_ev, &tv) < 0) {
				ERR("event_add failed while setting request timer (%s).",
						strerror(errno));
				return -1;
			}
		}
	}

	return sent;
}


void cmd_pipe_cb(int fd, short event, void *arg)
{
	struct jsonrpc_pipe_cmd *cmd;

	if(read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
		ERR("FATAL ERROR: failed to read from command pipe: %s\n",
				strerror(errno));
		return;
	}

	cfg_update();

	switch(cmd->type) {
		case CMD_CLOSE:
			if(cmd->server) {
				wait_close(cmd->server);
			}
			goto end;
			break;
		case CMD_RECONNECT:
			if(cmd->server) {
				wait_reconnect(cmd->server);
			}
			goto end;
			break;
		case CMD_CONNECT:
			if(cmd->server) {
				bev_connect(cmd->server);
			}
			goto end;
			break;
		case CMD_UPDATE_SERVER_GROUP:
			if(cmd->new_grp) {
				jsonrpc_server_group_t *old_grp = *global_server_group;
				*global_server_group = cmd->new_grp;
				free_server_group(&old_grp);
			}
			lock_release(jsonrpc_server_group_lock);
			goto end;
			break;

		case CMD_SEND:
			break;

		default:
			ERR("Unrecognized pipe command: %d\n", cmd->type);
			goto end;
			break;
	}

	/* command is SEND */

	jsonrpc_req_cmd_t *req_cmd = cmd->req_cmd;
	if(req_cmd == NULL) {
		ERR("req_cmd is NULL. Invalid send command\n");
		goto end;
	}

	jsonrpc_request_t *req = NULL;
	req = create_request(req_cmd);
	if(!req || !req->payload) {
		json_t *error = internal_error(JRPC_ERR_REQ_BUILD, NULL);
		pv_value_t val;
		char *freeme = NULL;
		jsontoval(&val, &freeme, error);
		if(req_cmd->route.len <= 0 && send_to_script(&val, req_cmd) < 0) {
			ERR("Failed to build request (method: %.*s, params: %.*s)\n",
					STR(req_cmd->method), STR(req_cmd->params));
		}
		if(freeme)
			free(freeme);
		if(error)
			json_decref(error);
		free_req_cmd(req_cmd);
		if(req)
			pkg_free(req);
		goto end;
	}

	int sent = jsonrpc_send(req_cmd->conn, req, req_cmd->notify_only);

	char *type;
	if(sent < 0) {
		if(req_cmd->notify_only == false) {
			type = "Request";
		} else {
			type = "Notification";
		}
		WARN("%s could not be sent to connection group: %.*s\n", type,
				STR(req_cmd->conn));
		fail_request(JRPC_ERR_SEND, req, "Failed to send request");
	}

end:
	free_pipe_cmd(cmd);
}

int handle_response(json_t *response)
{
	int retval = 0;
	jsonrpc_request_t *req = NULL;
	json_t *return_obj = NULL;
	json_t *internal = NULL;
	char *freeme = NULL;


	/* check if json object */
	if(!json_is_object(response)) {
		WARN("jsonrpc response is not an object\n");
		return -1;
	}

	/* check version */
	json_t *version = json_object_get(response, "jsonrpc");
	if(!version) {
		WARN("jsonrpc response does not have a version.\n");
		retval = -1;
		goto end;
	}

	const char *version_s = json_string_value(version);
	if(!version_s) {
		WARN("jsonrpc response version is not a string.\n");
		retval = -1;
		goto end;
	}

	if(strlen(version_s) != (sizeof(JSONRPC_VERSION) - 1)
			|| strncmp(version_s, JSONRPC_VERSION, sizeof(JSONRPC_VERSION) - 1)
					   != 0) {
		WARN("jsonrpc response version is not %s. version: %s\n",
				JSONRPC_VERSION, version_s);
		retval = -1;
		goto end;
	}

	/* check for an id */
	json_t *_id = json_object_get(response, "id");
	if(!_id) {
		WARN("jsonrpc response does not have an id.\n");
		retval = -1;
		goto end;
	}

	int id = json_integer_value(_id);
	if(!(req = pop_request(id))) {
		/* don't fail the server for an unrecognized id */
		retval = 0;
		goto end;
	}

	return_obj = json_object();

	json_t *error = json_object_get(response, "error");
	// if the error value is null, we don't care
	bool _error = error && (json_typeof(error) != JSON_NULL);

	json_t *result = json_object_get(response, "result");

	if(_error) {
		json_object_set(return_obj, "error", error);
	}

	if(result) {
		json_object_set(return_obj, "result", result);
	}

	if((!result && !_error) || (result && _error)) {
		WARN("bad response\n");
		internal = internal_error(JRPC_ERR_BAD_RESP, req->payload);
		json_object_update(return_obj, internal);
		if(internal)
			json_decref(internal);
	}

	pv_value_t val;

	if(jsontoval(&val, &freeme, return_obj) < 0) {
		fail_request(JRPC_ERR_TO_VAL, req,
				"Failed to convert response json to pv\n");
		retval = -1;
		goto end;
	}

	char *error_s = NULL;

	if(send_to_script(&val, req->cmd) >= 0) {
		goto free_and_end;
	}

	if(_error) {
		// get code from error
		json_t *_code = json_object_get(error, "code");
		if(_code) {
			int code = json_integer_value(_code);

			// check if code is in global_retry_ranges
			retry_range_t *tmpr;
			for(tmpr = global_retry_ranges; tmpr != NULL; tmpr = tmpr->next) {
				if((tmpr->start < tmpr->end && tmpr->start <= code
						   && code <= tmpr->end)
						|| (tmpr->end < tmpr->start && tmpr->end <= code
								&& code <= tmpr->start)
						|| (tmpr->start == tmpr->end && tmpr->start == code)) {
					if(schedule_retry(req) == 0) {
						goto end;
					}
					break;
				}
			}
		}
		error_s = json_dumps(error, JSON_COMPACT);
		if(error_s) {
			WARN("Request received an error: \n%s\n", error_s);
			free(error_s);
		} else {
			fail_request(JRPC_ERR_BAD_RESP, req,
					"Could not convert 'error' response to string");
			retval = -1;
			goto end;
		}
	}


free_and_end:
	free_req_cmd(req->cmd);
	free_request(req);

end:
	if(freeme)
		free(freeme);
	if(return_obj)
		json_decref(return_obj);
	return retval;
}

void handle_netstring(jsonrpc_server_t *server)
{
	unsigned int old_count = server->req_count;
	server->req_count--;
	if(server->hwm > 0 && old_count >= server->hwm
			&& server->req_count < server->hwm) {
		INFO("%.*s:%d in connection group %.*s is back to normal\n",
				STR(server->addr), server->port, STR(server->conn));
	}

	json_error_t error;

	json_t *res = json_loads(server->buffer->string, 0, &error);

	if(res) {
		if(handle_response(res) < 0) {
			ERR("Cannot handle jsonrpc response: %s\n", server->buffer->string);
		}
		json_decref(res);
	} else {
		ERR("Failed to parse json: %s\n", server->buffer->string);
		ERR("PARSE ERROR: %s at %d,%d\n", error.text, error.line, error.column);
	}
}

void bev_read_cb(struct bufferevent *bev, void *arg)
{
	jsonrpc_server_t *server = (jsonrpc_server_t *)arg;
	int retval = 0;
	while(retval == 0) {
		int retval = netstring_read_evbuffer(bev, &server->buffer);

		if(retval == NETSTRING_INCOMPLETE) {
			return;
		} else if(retval < 0) {
			char *msg = "";
			switch(retval) {
				case NETSTRING_ERROR_TOO_LONG:
					msg = "too long";
					break;
				case NETSTRING_ERROR_NO_COLON:
					msg = "no colon after length field";
					break;
				case NETSTRING_ERROR_TOO_SHORT:
					msg = "too short";
					break;
				case NETSTRING_ERROR_NO_COMMA:
					msg = "missing comma";
					break;
				case NETSTRING_ERROR_LEADING_ZERO:
					msg = "length field has a leading zero";
					break;
				case NETSTRING_ERROR_NO_LENGTH:
					msg = "missing length field";
					break;
				default:
					ERR("bad netstring: unknown error (%d)\n", retval);
					goto reconnect;
			}
			ERR("bad netstring: %s\n", msg);
		reconnect:
			force_reconnect(server);
			return;
		}

		handle_netstring(server);
		free_netstring(server->buffer);
		server->buffer = NULL;
	}
}

int set_non_blocking(int fd)
{
	int flags;

	flags = fcntl(fd, F_GETFL);
	if(flags < 0)
		return flags;
	flags |= O_NONBLOCK;
	if(fcntl(fd, F_SETFL, flags) < 0)
		return -1;

	return 0;
}

jsonrpc_pipe_cmd_t *create_pipe_cmd()
{
	jsonrpc_pipe_cmd_t *cmd = NULL;
	cmd = (jsonrpc_pipe_cmd_t *)shm_malloc(sizeof(jsonrpc_pipe_cmd_t));
	if(!cmd) {
		ERR("Failed to malloc pipe cmd.\n");
		return NULL;
	}
	memset(cmd, 0, sizeof(jsonrpc_pipe_cmd_t));

	return cmd;
}

void free_pipe_cmd(jsonrpc_pipe_cmd_t *cmd)
{
	if(!cmd)
		return;

	shm_free(cmd);
}

jsonrpc_req_cmd_t *create_req_cmd()
{
	jsonrpc_req_cmd_t *req_cmd = NULL;
	req_cmd = (jsonrpc_req_cmd_t *)shm_malloc(sizeof(jsonrpc_req_cmd_t));
	CHECK_MALLOC_NULL(req_cmd);
	memset(req_cmd, 0, sizeof(jsonrpc_req_cmd_t));

	req_cmd->conn = null_str;
	req_cmd->method = null_str;
	req_cmd->params = null_str;
	req_cmd->route = null_str;
	return req_cmd;
}

void free_req_cmd(jsonrpc_req_cmd_t *req_cmd)
{
	if(req_cmd) {
		CHECK_AND_FREE(req_cmd->conn.s);
		CHECK_AND_FREE(req_cmd->method.s);
		CHECK_AND_FREE(req_cmd->params.s);
		CHECK_AND_FREE(req_cmd->route.s);
		shm_free(req_cmd);
	}
}

int send_pipe_cmd(cmd_type type, void *data)
{
	char *name = "";
	jsonrpc_pipe_cmd_t *cmd = NULL;
	cmd = create_pipe_cmd();
	CHECK_MALLOC(cmd);

	cmd->type = type;

	switch(type) {
		case CMD_CONNECT:
			cmd->server = (jsonrpc_server_t *)data;
			name = "connect";
			break;
		case CMD_RECONNECT:
			cmd->server = (jsonrpc_server_t *)data;
			name = "reconnect";
			break;
		case CMD_CLOSE:
			cmd->server = (jsonrpc_server_t *)data;
			name = "close";
			break;
		case CMD_UPDATE_SERVER_GROUP:
			cmd->new_grp = (jsonrpc_server_group_t *)data;
			name = "update";
			break;
		case CMD_SEND:
			cmd->req_cmd = (jsonrpc_req_cmd_t *)data;
			name = "send";
			break;
		default:
			ERR("Unknown command type %d", type);
			goto error;
	}

	DEBUG("sending %s command\n", name);

	if(write(cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) {
		ERR("Failed to send '%s' cmd to io pipe: %s\n", name, strerror(errno));
		goto error;
	}

	return 0;
error:
	free_pipe_cmd(cmd);
	return -1;
}
